<!DOCTYPE html>



  


<html class="theme-next muse use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">









<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />










<meta name="description" content="1、KafkaConsumer 概述根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征：  在 Kafka 中 KafkaConsumer 是线程不安全的。  2.2.1 版本的KafkaConsumer 兼容 kafka 0.10.0 和 0.11.0 等低版本。  消息偏移量与消费偏移量(消息消费进度)Kafka 为分区中的每一条消息维护一个偏移量，即">
<meta property="og:type" content="article">
<meta property="og:title" content="初始 Kafka Consumer 消费者">
<meta property="og:url" content="https://www.codingw.net/posts/d9d4c345.html">
<meta property="og:site_name" content="中间件兴趣圈">
<meta property="og:description" content="1、KafkaConsumer 概述根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征：  在 Kafka 中 KafkaConsumer 是线程不安全的。  2.2.1 版本的KafkaConsumer 兼容 kafka 0.10.0 和 0.11.0 等低版本。  消息偏移量与消费偏移量(消息消费进度)Kafka 为分区中的每一条消息维护一个偏移量，即">
<meta property="og:locale">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191124101227865.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191124122831676.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="article:published_time" content="2020-10-23T15:18:01.000Z">
<meta property="article:modified_time" content="2021-04-26T09:59:57.727Z">
<meta property="article:author" content="中间件兴趣圈">
<meta property="article:tag" content="中间件">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://img-blog.csdnimg.cn/20191124101227865.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '',
    scheme: 'Muse',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://www.codingw.net/posts/d9d4c345.html"/>





  <title>初始 Kafka Consumer 消费者 | 中间件兴趣圈</title>
  








<meta name="generator" content="Hexo 5.4.0"></head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">中间件兴趣圈</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">微信搜『中间件兴趣圈』，回复『Java』获取200本优质电子书</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.codingw.net/posts/d9d4c345.html">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="中间件兴趣圈">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">初始 Kafka Consumer 消费者</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2020-10-23T23:18:01+08:00">
                2020-10-23
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/kafka/" itemprop="url" rel="index">
                    <span itemprop="name">kafka</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
          

          
          
             <span id="/posts/d9d4c345.html" class="leancloud_visitors" data-flag-title="初始 Kafka Consumer 消费者">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          
            <span class="post-meta-divider">|</span>
            <span class="page-pv"><i class="fa fa-file-o"></i>
            <span class="busuanzi-value" id="busuanzi_value_page_pv" ></span>次
            </span>
          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <div id="vip-container"><h2 id="1、KafkaConsumer-概述"><a href="#1、KafkaConsumer-概述" class="headerlink" title="1、KafkaConsumer 概述"></a>1、KafkaConsumer 概述</h2><p>根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征：</p>
<ul>
<li><p>在 Kafka 中 KafkaConsumer 是线程不安全的。</p>
</li>
<li><p>2.2.1 版本的KafkaConsumer 兼容 kafka 0.10.0 和 0.11.0 等低版本。</p>
</li>
<li><p>消息偏移量与消费偏移量(消息消费进度)<br>Kafka 为分区中的每一条消息维护一个偏移量，即消息偏移量。这个偏移量充当该分区内记录的唯一标识符。消费偏移量(消息消费进度)存储的是消费组当前的处理进度。消息消费进度的提交在 kafka 中可以定时自动提交也可以手动提交。手动提交可以调用 ommitSync() 或 commitAsync 方法。</p>
</li>
<li><p>消费组 与 订阅关系<br>多个消费这可以同属于一个消费组，消费组内的所有消费者共同消费主题下的所有消息。一个消费组可以订阅多个主题。</p>
</li>
<li><p>队列负载机制<br>既然同一个消费组内的消费者共同承担主题下所有队列的消费，那他们如何进行分工呢？默认情况下采取平均分配，例如一个消费组有两个消费者c1、c2，一个 topic 的分区数为6，那 c1 会负责3个分区的消费，同样 c2 会负责另外3个分区的分配。</p>
<p>那如果其中一个消费者宕机或新增一个消费者，那队列能动态调整吗？</p>
<p>答案是会重新再次平衡，例如如果新增一个消费者 c3，则c1,c2,c3都会负责2个分区的消息消费，分区重平衡会在后续文章中重点介绍。消费者也可以通过 assign 方法手动指定分区，此时会禁用默认的自动分配机制。</p>
</li>
<li><p>消费者故障检测机制<br>当通过 subscribe 方法订阅某些主题时，此时该消费者还未真正加入到订阅组，只有当 consumeer#poll 方法被调用后，并且会向 broker 定时发送心跳包，如果 broker 在 session.timeout.ms 时间内未收到心跳包，则 broker 会任务该消费者已宕机，会将其剔除，并触发消费端的分区重平衡。</p>
<p>消费者也有可能遇到“活体锁”的情况，即它继续发送心跳，但没有任何进展。在这种情况下，为了防止消费者无限期地占用它的分区，可以使用max.poll.interval.ms 设置提供了一个活性检测机制。基本上，如果您调用轮询的频率低于配置的最大间隔，那么客户机将主动离开组，以便另一个消费者可以接管它的分区。当这种情况发生时,您可能会看到一个偏移提交失败(由调用{@link #commitSync()}抛出的{@link CommitFailedException}表示)。</p>
</li>
<li><p>kafka 对 poll loop 行为的控制参数<br>Kafka 提供了如下两个参数来控制 poll 的行为：</p>
<ul>
<li> max.poll.interval.ms<br>允许 两次调用 poll 方法的最大间隔，即设置每一批任务最大的处理时间。</li>
<li> max.poll.records<br>每一次 poll 最大拉取的消息条数。</li>
</ul>
<p>对于消息处理时间不可预测的情况下上述两个参数可能不够用，那将如何是好呢？</p>
<p>通常的建议将消息拉取与消息消费分开，一个线程负责 poll 消息，处理这些消息使用另外的线程，这里就需要手动提交消费进度。为了控制消息拉起的过快，您可能会需要用到 Consumer#pause(Collection) 方法，暂时停止向该分区拉起消息。RocketMQ 的推模式就是采用了这种策略。如果大家有兴趣的话，可以从笔者所著的《RocketMQ技术内幕》一书中详细了解。</p>
</li>
</ul>
<span id="more"></span>

<h2 id="2、KafkaConsume-使用示例"><a href="#2、KafkaConsume-使用示例" class="headerlink" title="2、KafkaConsume 使用示例"></a>2、KafkaConsume 使用示例</h2><h3 id="2-1-自动提交消费进度"><a href="#2-1-自动提交消费进度" class="headerlink" title="2.1 自动提交消费进度"></a>2.1 自动提交消费进度</h3><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title">testConsumer1</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    Properties props = <span class="keyword">new</span> Properties();</span><br><span class="line">    props.setProperty(<span class="string">&quot;bootstrap.servers&quot;</span>, <span class="string">&quot;localhost:9092,localhost:9082,localhost:9072&quot;</span>);</span><br><span class="line">    props.setProperty(<span class="string">&quot;group.id&quot;</span>, <span class="string">&quot;C_ODS_ORDERCONSUME_01&quot;</span>);</span><br><span class="line">    props.setProperty(<span class="string">&quot;enable.auto.commit&quot;</span>, <span class="string">&quot;true&quot;</span>);</span><br><span class="line">    props.setProperty(<span class="string">&quot;auto.commit.interval.ms&quot;</span>, <span class="string">&quot;1000&quot;</span>);</span><br><span class="line">    props.setProperty(<span class="string">&quot;key.deserializer&quot;</span>, <span class="string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span>);</span><br><span class="line">    props.setProperty(<span class="string">&quot;value.deserializer&quot;</span>, <span class="string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span>);</span><br><span class="line">    KafkaConsumer&lt;String, String&gt; consumer = <span class="keyword">new</span> KafkaConsumer&lt;&gt;(props);</span><br><span class="line">    consumer.subscribe(Arrays.asList(<span class="string">&quot;TOPIC_ORDER&quot;</span>));</span><br><span class="line">    <span class="keyword">while</span> (<span class="keyword">true</span>) &#123;</span><br><span class="line">        ConsumerRecords&lt;String, String&gt;  records = consumer.poll(Duration.ofMillis(<span class="number">100</span>));</span><br><span class="line">        <span class="keyword">for</span> (ConsumerRecord&lt;String, String&gt; record : records) &#123;</span><br><span class="line">            System.out.println(<span class="string">&quot;消息消费中&quot;</span>);</span><br><span class="line">            System.out.printf(<span class="string">&quot;offset = %d, key = %s, value = %s%n&quot;</span>, record.offset(), record.key(), record.value());</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<h4 id="2-2-手动提交消费进度"><a href="#2-2-手动提交消费进度" class="headerlink" title="2.2 手动提交消费进度"></a>2.2 手动提交消费进度</h4><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title">testConsumer2</span><span class="params">()</span> </span>&#123;</span><br><span class="line">        Properties props = <span class="keyword">new</span> Properties();</span><br><span class="line">        props.setProperty(<span class="string">&quot;bootstrap.servers&quot;</span>, <span class="string">&quot;localhost:9092&quot;</span>);</span><br><span class="line">        props.setProperty(<span class="string">&quot;group.id&quot;</span>, <span class="string">&quot;test&quot;</span>);</span><br><span class="line">        props.setProperty(<span class="string">&quot;enable.auto.commit&quot;</span>, <span class="string">&quot;false&quot;</span>);</span><br><span class="line">        props.setProperty(<span class="string">&quot;key.deserializer&quot;</span>, <span class="string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span>);</span><br><span class="line">        props.setProperty(<span class="string">&quot;value.deserializer&quot;</span>, <span class="string">&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;</span>);</span><br><span class="line">        KafkaConsumer&lt;String, String&gt; consumer = <span class="keyword">new</span> KafkaConsumer&lt;&gt;(props);</span><br><span class="line">        consumer.subscribe(Arrays.asList(<span class="string">&quot;foo&quot;</span>, <span class="string">&quot;bar&quot;</span>));</span><br><span class="line">        <span class="keyword">final</span> <span class="keyword">int</span> minBatchSize = <span class="number">200</span>;</span><br><span class="line">        List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = <span class="keyword">new</span> ArrayList&lt;&gt;();</span><br><span class="line">        <span class="keyword">while</span> (<span class="keyword">true</span>) &#123;</span><br><span class="line">            ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofMillis(<span class="number">100</span>));</span><br><span class="line">            <span class="keyword">for</span> (ConsumerRecord&lt;String, String&gt; record : records) &#123;</span><br><span class="line">                buffer.add(record);</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">if</span> (buffer.size() &gt;= minBatchSize) &#123;</span><br><span class="line">                <span class="comment">// insertIntoDb(buffer);</span></span><br><span class="line">                <span class="comment">// 省略处理逻辑</span></span><br><span class="line">                consumer.commitSync();</span><br><span class="line">                buffer.clear();</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br></pre></td></tr></table></figure>
<h2 id="3、认识-Consumer-接口"><a href="#3、认识-Consumer-接口" class="headerlink" title="3、认识 Consumer 接口"></a>3、认识 Consumer 接口</h2><p>要认识 Kafka 的消费者，个人认为最好的办法就是从它的类图着手，下面给出 Consumer 接口的类图。<br><img src="https://img-blog.csdnimg.cn/20191124101227865.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>接下来对起重点方法进行一个初步的介绍，从下篇文章开始将对其进行详细设计。</p>
<ul>
<li>Set&lt; TopicPartition&gt; assignment()<br>获取该消费者的队列分配列表。</li>
<li>Set&lt; String&gt; subscription()<br>获取该消费者的订阅信息。</li>
<li>void subscribe(Collection&lt; String&gt; topics)<br>订阅主题。</li>
<li>void subscribe(Collection&lt; String&gt; topics, ConsumerRebalanceListener callback)<br>订阅主题，并指定队列重平衡的监听器。</li>
<li>void assign(Collection&lt; TopicPartition&gt; partitions)<br>取代 subscription，手动指定消费哪些队列。</li>
<li>void unsubscribe()<br>取消订阅关系。</li>
<li>ConsumerRecords&lt;K, V&gt; poll(Duration timeout)<br>拉取消息，是 KafkaConsumer 的核心方法，将在下文详细介绍。</li>
<li>void commitSync()<br>同步提交消费进度，为本批次的消费提交，将在后续文章中详细介绍。</li>
<li>void commitSync(Duration timeout)<br>同步提交消费进度，可设置超时时间。</li>
<li>void commitSync(Map&lt;TopicPartition, OffsetAndMetadata&gt; offsets)<br>显示同步提交消费进度， offsets 指明需要提交消费进度的信息。</li>
<li>void commitSync(final Map&lt;TopicPartition, OffsetAndMetadata&gt; offsets, final Duration timeout)<br>显示同步提交消费进度，带超时间。</li>
<li>void seek(TopicPartition partition, long offset)<br>重置 consumer#poll 方法下一次拉消息的偏移量。</li>
<li>void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)<br>seek 方法重载方法。</li>
<li>void seekToBeginning(Collection&lt; TopicPartition&gt; partitions)<br>将 poll 方法下一次的拉取偏移量设置为队列的初始偏移量。</li>
<li>void seekToEnd(Collection&lt; TopicPartition&gt; partitions)<br>将 poll 方法下一次的拉取偏移量设置为队列的最大偏移量。</li>
<li>long position(TopicPartition partition)<br>获取将被拉取的偏移量。</li>
<li>long position(TopicPartition partition, final Duration timeout)<br>同上。</li>
<li>OffsetAndMetadata committed(TopicPartition partition)<br>获取指定分区已提交的偏移量。</li>
<li>OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)<br>同上。</li>
<li>Map&lt;MetricName, ? extends Metric&gt; metrics()<br>统计指标。</li>
<li>List&lt; PartitionInfo&gt; partitionsFor(String topic)<br>获取主题的路由信息。</li>
<li>List&lt; PartitionInfo&gt; partitionsFor(String topic, Duration timeout)<br>同上。</li>
<li>Map&lt;String, List&lt; PartitionInfo&gt;&gt; listTopics()<br>获取所有 topic 的路由信息。</li>
<li>Map&lt;String, List&lt; PartitionInfo&gt;&gt; listTopics(Duration timeout)<br>同上。</li>
<li>Set&lt; TopicPartition&gt; paused()<br>获取已挂起的分区信息。</li>
<li>void pause(Collection&lt; TopicPartition&gt; partitions)<br>挂起分区，下一次 poll 方法将不会返回这些分区的消息。</li>
<li>void resume(Collection&lt; TopicPartition&gt; partitions)<br>恢复挂起的分区。</li>
<li>Map&lt;TopicPartition, OffsetAndTimestamp&gt; offsetsForTimes(Map&lt;TopicPartition, Long&gt; timestampsToSearch)<br>根据时间戳查找最近的一条消息的偏移量。</li>
<li>Map&lt;TopicPartition, OffsetAndTimestamp&gt; offsetsForTimes(Map&lt;TopicPartition, Long&gt; timestampsToSearch, Duration timeout)<br>同上。</li>
<li>Map&lt;TopicPartition, Long&gt; beginningOffsets(Collection&lt; TopicPartition&gt; partitions)<br>查询指定分区当前最小的偏移量。</li>
<li>Map&lt;TopicPartition, Long&gt; beginningOffsets(Collection&lt; TopicPartition&gt; partitions, Duration timeout)<br>同上。</li>
<li>Map&lt;TopicPartition, Long&gt; endOffsets(Collection&lt; TopicPartition&gt; partitions)<br>查询指定分区当前最大的偏移量。</li>
<li>Map&lt;TopicPartition, Long&gt; endOffsets(Collection&lt; TopicPartition&gt; partitions, Duration timeout)<br>同上。</li>
<li>void close()<br>关闭消费者。</li>
<li>void close(Duration timeout)<br>关闭消费者。</li>
<li>void wakeup()<br>唤醒消费者。</li>
</ul>
<h2 id="4、初始-KafkaConsumer"><a href="#4、初始-KafkaConsumer" class="headerlink" title="4、初始 KafkaConsumer"></a>4、初始 KafkaConsumer</h2><p><img src="https://img-blog.csdnimg.cn/20191124122831676.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>接下来笔者根据其构造函数，对一一介绍其核心属性的含义，为接下来讲解其核心方法打下基础。</p>
<ul>
<li>String groupId<br>消费组ID。同一个消费组内的多个消费者共同消费一个主题下的消息。</li>
<li>String clientId<br>发出请求时传递给服务器的id字符串。设置该值的目的是方便在服务器端请求日志中包含逻辑应用程序名称，从而能够跟踪ip/端口之外的请求源。该值可以设置为应用名称。</li>
<li>ConsumerCoordinator coordinator<br>消费协调器，后续会详细介绍。</li>
<li>Deserializer&lt; K&gt; keyDeserializer<br>key 序列化器。</li>
<li>Deserializer&lt; V&gt; valueDeserializer<br>值序列化器。</li>
<li>ConsumerNetworkClient client<br>网络通讯客户端。</li>
<li>SubscriptionState subscriptions<br>用于管理订阅状态的类，用于跟踪 topics, partitions, offsets 等信息。后续会详细介绍。</li>
<li>ConsumerMetadata metadata<br>消费者元数据信息，包含路由信息。</li>
<li>long retryBackoffMs<br>如果向 broker 发送请求失败后，发起重试之前需要等待的间隔时间，通过属性 retry.backoff.ms　指定。</li>
<li>long requestTimeoutMs<br>一次请求的超时时间。</li>
<li>int defaultApiTimeoutMs<br>为所有可能阻塞的API设置一个默认的超时时间。</li>
<li>List&lt; PartitionAssignor&gt; assignors<br>分区分配算法（分区负载算法）。</li>
</ul>
<p>Kafka Consumer 消费者就介绍到这里了，从下篇文章开始将开始详细介绍 Kafka 关于消息消费的方方面面。</p>
</div>

			<script src="https://my.openwrite.cn/js/readmore.js" type="text/javascript"></script>
			<script>
			var isMobile = navigator.userAgent.match(/(phone|pad|pod|iPhone|iPod|ios|iPad|Android|Mobile|BlackBerry|IEMobile|MQQBrowser|JUC|Fennec|wOSBrowser|BrowserNG|WebOS|Symbian|Windows Phone)/i);
			if (!isMobile) {
			    var btw = new BTWPlugin();
			    btw.init({
			        "id": "vip-container",
			        "blogId": "18019-1573088808868-542",
			        "name": "中间件兴趣圈",
			        "qrcode": "https://img-blog.csdnimg.cn/20190314214003962.jpg",
			        "keyword": "more"
			    });
			}
			</script>
		
      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/posts/497923c7.html" rel="next" title="源码分析Kafka 消息拉取流程">
                <i class="fa fa-chevron-left"></i> 源码分析Kafka 消息拉取流程
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/posts/b579d244.html" rel="prev" title="初识 Kafka Producer 生产者">
                初识 Kafka Producer 生产者 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name"></p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">139</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#1%E3%80%81KafkaConsumer-%E6%A6%82%E8%BF%B0"><span class="nav-number">1.</span> <span class="nav-text">1、KafkaConsumer 概述</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#2%E3%80%81KafkaConsume-%E4%BD%BF%E7%94%A8%E7%A4%BA%E4%BE%8B"><span class="nav-number">2.</span> <span class="nav-text">2、KafkaConsume 使用示例</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#2-1-%E8%87%AA%E5%8A%A8%E6%8F%90%E4%BA%A4%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6"><span class="nav-number">2.1.</span> <span class="nav-text">2.1 自动提交消费进度</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#2-2-%E6%89%8B%E5%8A%A8%E6%8F%90%E4%BA%A4%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6"><span class="nav-number">2.1.1.</span> <span class="nav-text">2.2 手动提交消费进度</span></a></li></ol></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#3%E3%80%81%E8%AE%A4%E8%AF%86-Consumer-%E6%8E%A5%E5%8F%A3"><span class="nav-number">3.</span> <span class="nav-text">3、认识 Consumer 接口</span></a></li><li class="nav-item nav-level-2"><a class="nav-link" href="#4%E3%80%81%E5%88%9D%E5%A7%8B-KafkaConsumer"><span class="nav-number">4.</span> <span class="nav-text">4、初始 KafkaConsumer</span></a></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">中间件兴趣圈</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Muse</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://dn-lbstatics.qbox.me/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("NNEhOL0iOcflg8f1U3HUqiCq-gzGzoHsz", "7kSmkbbb3DktmHALlShDsBUF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
